package com.ruyuan.eshop.promotion.mq.consumer.listener;

import com.alibaba.fastjson.JSON;
import com.ruyuan.eshop.common.constants.RocketMqConstant;
import com.ruyuan.eshop.common.core.JsonResult;
import com.ruyuan.eshop.common.exception.BaseBizException;
import com.ruyuan.eshop.common.message.PlatformCouponMessage;
import com.ruyuan.eshop.common.message.PlatformCouponUserBucketMessage;
import com.ruyuan.eshop.membership.api.AccountApi;
import com.ruyuan.eshop.membership.domain.dto.MembershipAccountDTO;
import com.ruyuan.eshop.common.concurrent.SafeThreadPool;
import com.ruyuan.eshop.promotion.mq.producer.DefaultProducer;
import com.ruyuan.eshop.promotion.mq.splitter.ListSplitter;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboReference;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.List;

import static com.ruyuan.eshop.common.constants.BatchSizeConstant.MESSAGE_BATCH_SIZE;

/**
 * @author zhonghuashishan
 */
@Slf4j
@Component
public class PlatFormCouponUserBucketListener implements MessageListenerConcurrently {

    /**
     * 账户服务
     */
    @DubboReference(version = "1.0.0")
    private AccountApi accountApi;

    /**
     * 发送消息共用的线程池
     */
    @Autowired
    @Qualifier("sharedSendMsgThreadPool")
    private SafeThreadPool sharedSendMsgThreadPool;

    /**
     * rocketmq生产者
     */
    @Autowired
    private DefaultProducer defaultProducer;

    /**
     * 并发消费消息
     * @param msgList
     * @param context
     * @return
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {

        try {
            for (MessageExt messageExt : msgList) {

                // 1、反序列化消息
                String messageString = new String(messageExt.getBody());
                log.debug("执行平台发送优惠券用户桶消息逻辑，消息内容：{}", messageString);
                PlatformCouponUserBucketMessage message = JSON.parseObject(messageString, PlatformCouponUserBucketMessage.class);

                // 2、查询桶内的用户信息
                Long startUserId = message.getStartUserId();
                Long endUserId = message.getEndUserId();
                JsonResult<List<MembershipAccountDTO>> accountBucketResult = accountApi.queryAccountByIdRange(startUserId, endUserId);
                if (!accountBucketResult.getSuccess()) {
                    throw new BaseBizException(accountBucketResult.getErrorCode(), accountBucketResult.getErrorMessage());
                }
                List<MembershipAccountDTO> accountBucket = accountBucketResult.getData();
                if (CollectionUtils.isEmpty(accountBucket)) {
                    log.info("根据用户桶内的id范围没有查询到用户, startUserId={}, endUserId{}", startUserId, endUserId);
                    continue;
                }

                // 3、每个用户发送一条"平台发送优惠券消息"
                // 这里是并行消费的，以上逻辑已经是并行执行的了，而且有查库的操作
                // accountBucket 默认是 1000 个用户，要为每一个用户都发送一条"平台发送优惠券消息"，也就是1000条消息
                // 下面我们使用线程池来并行发送这1000条消息（ps：另一种也可以像发送优惠券用户桶消息一样用批量发送）
                PlatformCouponMessage couponMessage = PlatformCouponMessage.builder()
                        .couponId(message.getCouponId())
                        .activityStartTime(message.getActivityStartTime())
                        .activityEndTime(message.getActivityEndTime())
                        .couponType(message.getCouponType())
                        .build();
                List<String> messages = new ArrayList<>();
                for (MembershipAccountDTO account : accountBucket) {
                    couponMessage.setUserAccountId(account.getId());
                    messages.add(JSON.toJSONString(couponMessage));
                }
                log.info("本次推送消息数量,{}",messages.size());
                ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
                while (splitter.hasNext()){
                    List<String> sendBatch = splitter.next();
                    sharedSendMsgThreadPool.execute(() -> {
                        defaultProducer.sendMessages(RocketMqConstant.PLATFORM_COUPON_SEND_TOPIC, sendBatch, "平台发送优惠券消息");
                    });
                }
            }
        } catch (Exception e){
            log.error("consume error,平台优惠券消费失败", e);
            // 本次消费失败，下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}
